package com.atguigu.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.common.GmallConfig;
import com.atguigu.utils.DimUtil;
import com.atguigu.utils.DruidDSUtil;
import com.atguigu.utils.JedisPoolUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.sql.PreparedStatement;
import java.util.Collection;
import java.util.Set;

public class DimSinkFunction extends RichSinkFunction<JSONObject> {

    private DruidDataSource dataSource;
    private JedisPool jedisPool;

    @Override
    public void open(Configuration parameters) throws Exception {
        dataSource = DruidDSUtil.createDataSource();
        jedisPool = JedisPoolUtil.getJedisPool();
    }

    //value:{"database":"gmall","table":"base_trademark","type":"insert","ts":1592270938,"xid":13090,"xoffset":1573,"data":{"id":"12","tm_name":"atguigu"},"sinkTable":"dim_base_trademark"}
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {

        //拼接SQL  upsert into db.tn(id,tm_name) values('12','atguigu')
        String sinkTable = value.getString("sinkTable");
        JSONObject data = value.getJSONObject("data");
        String upsertSql = genUpsertSql(sinkTable,
                data);

        System.out.println("upsertSql>>>>>>>>" + upsertSql);

        //获取连接
        DruidPooledConnection connection = dataSource.getConnection();

        //编译SQL
        PreparedStatement preparedStatement = connection.prepareStatement(upsertSql);

        //判断如果为更新操作,则先删除Redis中的数据
        if ("update".equals(value.getString("type"))) {
            Jedis jedis = jedisPool.getResource();
            DimUtil.delRedisDimInfo(jedis,
                    sinkTable.toUpperCase(),
                    data.getString("id")
            );
            jedis.close();
        }

        //执行插入数据操作
        preparedStatement.execute();
        connection.commit();

        //释放资源
        preparedStatement.close();
        connection.close();
    }

    /**
     * @param sinkTable dim_base_trademark
     * @param data      {"id":"12","tm_name":"atguigu"}
     * @return upsert into db.dim_base_trademark(id,tm_name) values('12','atguigu')
     * <p>
     * [12,atguigu,male]  ===>  '12','atguigu','male'
     */
    private String genUpsertSql(String sinkTable, JSONObject data) {

        Set<String> columns = data.keySet();
        Collection<Object> values = data.values();

        //StringUtils.join(columns, ",") <==> columns.mkString(",") ===> "id,tm_name"
        return "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTable + "(" +
                StringUtils.join(columns, ",") + ") values ('" +
                StringUtils.join(values, "','") + "')";
    }
}
